Skip to content

Commit a4b2b49

Browse files
makinje16timvaillancourtGuptaManan100dbussinktwthorn
committed
VStream API: allow keyspace-level heartbeats to be streamed (vitessio#16593) (#620)
* VStream API: allow keyspace-level heartbeats to be streamed (vitessio#16593) Signed-off-by: Malcolm Akinje <[email protected]> * `slack-19.0` backport v22 `vtorc` optimizations + stats, part 3 (#618) * Remove unused code in discovery queue creation (vitessio#17515) Signed-off-by: Manan Gupta <[email protected]> * vtorc: Cleanup unused code (vitessio#15508) Signed-off-by: Dirkjan Bussink <[email protected]> * `vtorc`: cleanup discover queue, add concurrency flag (vitessio#17825) Signed-off-by: Tim Vaillancourt <[email protected]> * `vtorc`: add tablets watched stats Signed-off-by: Tim Vaillancourt <[email protected]> * fix missing merge conflict update Signed-off-by: Tim Vaillancourt <[email protected]> * `vtorc`: skip unnecessary `inst.ReadTablet` in `logic.LockShard(...)` Signed-off-by: Tim Vaillancourt <[email protected]> * `vtorc`: use `errgroup` in keyspace/shard discovery Signed-off-by: Tim Vaillancourt <[email protected]> * fix import Signed-off-by: Tim Vaillancourt <[email protected]> * fix ineffassign Signed-off-by: Tim Vaillancourt <[email protected]> * missing import Signed-off-by: Tim Vaillancourt <[email protected]> * `vtorc`: add stats for discovery workers Signed-off-by: Tim Vaillancourt <[email protected]> * get count from backend Signed-off-by: Tim Vaillancourt <[email protected]> * rm unused map Signed-off-by: Tim Vaillancourt <[email protected]> --------- Signed-off-by: Manan Gupta <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Signed-off-by: Tim Vaillancourt <[email protected]> Co-authored-by: Manan Gupta <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]> * Bp pr 17558 pr 17858.slack19.0 (#615) * VReplication: Improve error handling in VTGate VStreams (vitessio#17558) Signed-off-by: Tom Thornton <[email protected]> * Backport vitessio#17858 --------- Signed-off-by: Tom Thornton <[email protected]> * `slack-19.0`: re-backport tweaks from vitessio#17911 (#621) * fix bug in reverse `if` Signed-off-by: Tim Vaillancourt <[email protected]> * simplify Signed-off-by: Tim Vaillancourt <[email protected]> * add `ReadTabletCountsByShard` test Signed-off-by: Tim Vaillancourt <[email protected]> * use map of map Signed-off-by: Tim Vaillancourt <[email protected]> * capitalize Cell Signed-off-by: Tim Vaillancourt <[email protected]> * gofmt lint Signed-off-by: Tim Vaillancourt <[email protected]> * fix plural in names Signed-off-by: Tim Vaillancourt <[email protected]> --------- Signed-off-by: Tim Vaillancourt <[email protected]> --------- Signed-off-by: Malcolm Akinje <[email protected]> Signed-off-by: Manan Gupta <[email protected]> Signed-off-by: Dirkjan Bussink <[email protected]> Signed-off-by: Tim Vaillancourt <[email protected]> Signed-off-by: Tom Thornton <[email protected]> Signed-off-by: Malcolm Akinje <[email protected]> Co-authored-by: Tim Vaillancourt <[email protected]> Co-authored-by: Manan Gupta <[email protected]> Co-authored-by: Dirkjan Bussink <[email protected]> Co-authored-by: Tom Thornton <[email protected]>
1 parent e1e335e commit a4b2b49

32 files changed

+1690
-554
lines changed

go/test/endtoend/vreplication/cluster_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type ClusterConfig struct {
8787
vtorcPort int
8888

8989
vreplicationCompressGTID bool
90+
// Set overrideHeartbeatOptions to true to override the default heartbeat options:
91+
// which are set to only on demand (5s) and 250ms interval.
92+
overrideHeartbeatOptions bool
9093
}
9194

9295
// enableGTIDCompression enables GTID compression for the cluster and returns a function
@@ -522,9 +525,16 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
522525

523526
options := []string{
524527
"--queryserver-config-schema-reload-time", "5s",
528+
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
529+
530+
defaultHeartbeatOptions := []string{
525531
"--heartbeat_on_demand_duration", "5s",
526532
"--heartbeat_interval", "250ms",
527-
} // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time"
533+
}
534+
if !mainClusterConfig.overrideHeartbeatOptions {
535+
options = append(options, defaultHeartbeatOptions...)
536+
}
537+
528538
options = append(options, extraVTTabletArgs...)
529539

530540
if mainClusterConfig.vreplicationCompressGTID {

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,3 +946,139 @@ func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
946946
require.NotZero(t, ne.numDash40Events)
947947
require.NotZero(t, ne.num40DashEvents)
948948
}
949+
950+
const (
951+
vstreamHeartbeatsTestContextTimeout = 20 * time.Second
952+
// Expect a reasonable number of heartbeats to be received in the test duration, should ideally be ~ timeout
953+
// since the heartbeat interval is set to 1s. But we set it to 10 to be conservative to avoid CI flakiness.
954+
numExpectedHeartbeats = 10
955+
)
956+
957+
func doVStream(t *testing.T, vc *VitessCluster, flags *vtgatepb.VStreamFlags) (numRowEvents map[string]int, numFieldEvents map[string]int) {
958+
// Stream for a while to ensure heartbeats are sent.
959+
ctx, cancel := context.WithTimeout(context.Background(), vstreamHeartbeatsTestContextTimeout)
960+
defer cancel()
961+
962+
numRowEvents = make(map[string]int)
963+
numFieldEvents = make(map[string]int)
964+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
965+
require.NoError(t, err)
966+
defer vstreamConn.Close()
967+
968+
done := false
969+
vgtid := &binlogdatapb.VGtid{
970+
ShardGtids: []*binlogdatapb.ShardGtid{{
971+
Keyspace: "product",
972+
Shard: "0",
973+
Gtid: "",
974+
}}}
975+
976+
filter := &binlogdatapb.Filter{
977+
Rules: []*binlogdatapb.Rule{{
978+
Match: "customer",
979+
Filter: "select * from customer",
980+
}},
981+
}
982+
// Stream events from the VStream API.
983+
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
984+
require.NoError(t, err)
985+
for !done {
986+
evs, err := reader.Recv()
987+
switch err {
988+
case nil:
989+
for _, ev := range evs {
990+
switch ev.Type {
991+
case binlogdatapb.VEventType_ROW:
992+
rowEvent := ev.RowEvent
993+
arr := strings.Split(rowEvent.TableName, ".")
994+
require.Equal(t, len(arr), 2)
995+
tableName := arr[1]
996+
require.Equal(t, "product", rowEvent.Keyspace)
997+
require.Equal(t, "0", rowEvent.Shard)
998+
numRowEvents[tableName]++
999+
1000+
case binlogdatapb.VEventType_FIELD:
1001+
fieldEvent := ev.FieldEvent
1002+
arr := strings.Split(fieldEvent.TableName, ".")
1003+
require.Equal(t, len(arr), 2)
1004+
tableName := arr[1]
1005+
require.Equal(t, "product", fieldEvent.Keyspace)
1006+
require.Equal(t, "0", fieldEvent.Shard)
1007+
numFieldEvents[tableName]++
1008+
default:
1009+
}
1010+
}
1011+
case io.EOF:
1012+
log.Infof("Stream Ended")
1013+
done = true
1014+
default:
1015+
log.Errorf("remote error: %v", err)
1016+
done = true
1017+
}
1018+
}
1019+
return numRowEvents, numFieldEvents
1020+
}
1021+
1022+
// TestVStreamHeartbeats enables streaming of the internal Vitess heartbeat tables in the VStream API and
1023+
// ensures that the heartbeat events are received as expected by the client.
1024+
func TestVStreamHeartbeats(t *testing.T) {
1025+
// Enable continuous heartbeats.
1026+
extraVTTabletArgs = append(extraVTTabletArgs,
1027+
"--heartbeat_enable",
1028+
"--heartbeat_interval", "1s",
1029+
"--heartbeat_on_demand_duration", "0",
1030+
)
1031+
setSidecarDBName("_vt")
1032+
config := *mainClusterConfig
1033+
config.overrideHeartbeatOptions = true
1034+
vc = NewVitessCluster(t, &clusterOptions{
1035+
clusterConfig: &config,
1036+
})
1037+
defer vc.TearDown()
1038+
1039+
require.NotNil(t, vc)
1040+
defaultReplicas = 0
1041+
defaultRdonly = 0
1042+
1043+
defaultCell := vc.Cells[vc.CellNames[0]]
1044+
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema,
1045+
defaultReplicas, defaultRdonly, 100, nil)
1046+
verifyClusterHealth(t, vc)
1047+
insertInitialData(t)
1048+
1049+
expectedNumRowEvents := make(map[string]int)
1050+
expectedNumRowEvents["customer"] = 3 // 3 rows inserted in the customer table in insertInitialData()
1051+
1052+
type testCase struct {
1053+
name string
1054+
flags *vtgatepb.VStreamFlags
1055+
expectedHeartbeats int
1056+
}
1057+
testCases := []testCase{
1058+
{
1059+
name: "With Keyspace Heartbeats On",
1060+
flags: &vtgatepb.VStreamFlags{
1061+
StreamKeyspaceHeartbeats: true,
1062+
},
1063+
expectedHeartbeats: numExpectedHeartbeats,
1064+
},
1065+
{
1066+
name: "With Keyspace Heartbeats Off",
1067+
flags: nil,
1068+
expectedHeartbeats: 0,
1069+
},
1070+
}
1071+
1072+
for _, tc := range testCases {
1073+
t.Run(tc.name, func(t *testing.T) {
1074+
gotNumRowEvents, gotNumFieldEvents := doVStream(t, vc, tc.flags)
1075+
for k := range expectedNumRowEvents {
1076+
require.Equalf(t, 1, gotNumFieldEvents[k], "incorrect number of field events for table %s, got %d", k, gotNumFieldEvents[k])
1077+
}
1078+
require.GreaterOrEqual(t, gotNumRowEvents["heartbeat"], tc.expectedHeartbeats, "incorrect number of heartbeat events received")
1079+
log.Infof("Total number of heartbeat events received: %v", gotNumRowEvents["heartbeat"])
1080+
delete(gotNumRowEvents, "heartbeat")
1081+
require.Equal(t, expectedNumRowEvents, gotNumRowEvents)
1082+
})
1083+
}
1084+
}

0 commit comments

Comments
 (0)